Skip to content

DBOS runtime#157

Draft
adrianlyjak wants to merge 8 commits intomainfrom
adrian/dbos
Draft

DBOS runtime#157
adrianlyjak wants to merge 8 commits intomainfrom
adrian/dbos

Conversation

@adrianlyjak
Copy link
Contributor

@adrianlyjak adrianlyjak commented Oct 21, 2025

  • Removes the broker
  • Splits up the context into three separate components, hidden behind the existing ctx interface
  • extends plugins and renames them to runtimes
  • still in progress dbos integration as an alternate runtime

Open with Devin

@adrianlyjak adrianlyjak force-pushed the adrian/dbos branch 3 times, most recently from f96df50 to 82c5546 Compare October 22, 2025 18:34
@adrianlyjak adrianlyjak marked this pull request as ready for review October 22, 2025 18:36
@adrianlyjak adrianlyjak force-pushed the adrian/context-refact branch 2 times, most recently from 070fba2 to 862274b Compare October 28, 2025 17:31
Base automatically changed from adrian/context-refact to main October 28, 2025 18:32
@changeset-bot
Copy link

changeset-bot bot commented Jan 15, 2026

⚠️ No Changeset found

Latest commit: 84395fc

Merging this PR will not cause a version bump for any packages. If these changes should not result in a new version, you're good to go. If these changes should result in a version bump, you need to add a changeset.

This PR includes no changesets

When changesets are added to this PR, you'll see the packages that this PR includes changesets for and the associated semver types

Click here to learn what changesets are, and how to add one.

Click here if you're a maintainer who wants to add a changeset to this PR

@coveralls
Copy link

coveralls commented Jan 15, 2026

Pull Request Test Coverage Report for Build 21671598072

Details

  • 515 of 605 (85.12%) changed or added relevant lines in 6 files are covered.
  • 3 unchanged lines in 1 file lost coverage.
  • Overall coverage increased (+0.5%) to 81.874%

Changes Missing Coverage Covered Lines Changed/Added Lines %
packages/llama-agents-dbos/src/llama_agents/dbos/journal/task_journal.py 38 46 82.61%
packages/llama-agents-dbos/src/llama_agents/dbos/runtime.py 239 268 89.18%
packages/llama-agents-dbos/src/llama_agents/dbos/state_store.py 203 256 79.3%
Files with Coverage Reduction New Missed Lines %
workflows/context/state_store.py 3 89.05%
Totals Coverage Status
Change from base Build 21649211948: 0.5%
Covered Lines: 7715
Relevant Lines: 9423

💛 - Coveralls

@adrianlyjak adrianlyjak changed the title WIP: dbos integration with plugins Runtime refactor to support pluggable runtimes Jan 22, 2026
Copy link

@devin-ai-integration devin-ai-integration bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Devin Review found 4 potential issues.

View issues and 14 additional flags in Devin Review.

Open in Devin Review

Comment on lines 1685 to 1713
@property
def status(self) -> Status:
"""Get the current status by inspecting the handler state."""
if not self.run_handler.done():
"""Get the current status by inspecting the terminal event or handler state.

Status is derived from the terminal event type when available:
- WorkflowCancelledEvent -> "cancelled"
- WorkflowTimedOutEvent -> "failed" (timeout is a failure mode)
- WorkflowFailedEvent -> "failed"
- Plain StopEvent -> "completed"

Falls back to checking handler state if no terminal event yet.
"""
# First check if we have a terminal event - derive status from event type
if self._terminal_event is not None:
if isinstance(self._terminal_event, WorkflowCancelledEvent):
return "cancelled"
elif isinstance(self._terminal_event, WorkflowTimedOutEvent):
return "failed"
elif isinstance(self._terminal_event, WorkflowFailedEvent):
return "failed"
else:
return "completed"

# Fall back to handler state check if no terminal event yet
if not self.run_handler.is_done():
return "running"
# done - check if cancelled first
if self.run_handler.cancelled():
return "cancelled"
# then check for exception
exc = self.run_handler.exception()
if exc is not None:
return "failed"
return "completed"
# If handler is done but we don't have a terminal event, it was likely
# cancelled externally or failed before emitting a terminal event
return "running"

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 WorkflowServer status can report "running" even after handler completion if no terminal event was observed

In _WorkflowHandler.status, if no terminal event was recorded and run_handler.is_done() is true, the code returns "running" unconditionally.

Actual behavior: completed/failed/cancelled runs can be reported as "running" in persistence/API if the terminal StopEvent was not observed/recorded by _stream_events for any reason.

Expected behavior: if the handler is done and there is no terminal event, the status should be derived from handler completion state (cancelled vs exception vs completed), not forced to "running".

Code: workflows/server/server.py:1685-1713

Recommendation: When run_handler.is_done() is true and _terminal_event is None, fall back to run_handler.cancelled() / run_handler.exception() to classify as cancelled/failed/completed (or at least "failed"/"cancelled"), rather than returning "running".

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

@adrianlyjak adrianlyjak force-pushed the adrian/dbos branch 2 times, most recently from b231c6d to 2574a33 Compare January 22, 2026 22:15
Copy link

@devin-ai-integration devin-ai-integration bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Devin Review found 1 new potential issue.

View issue and 19 additional flags in Devin Review.

Open in Devin Review

Comment on lines 184 to 217
def __init__(self) -> None:
self._queues: dict[str, AsyncioAdapterQueues] = {}
self._max_concurrent_runs: weakref.WeakValueDictionary[
str, asyncio.Semaphore
] = weakref.WeakValueDictionary()

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 BasicRuntime concurrency limiting can silently stop working due to WeakValueDictionary semaphore storage

BasicRuntime stores per-workflow semaphores in a weakref.WeakValueDictionary:

self._max_concurrent_runs: weakref.WeakValueDictionary[str, asyncio.Semaphore]

(basic.py:184-188).

Because the only long-lived reference to each semaphore is the weak dictionary entry, semaphores may be garbage-collected at any time when not currently being awaited. When that happens, the next call to _maybe_acquire_max_concurrent_runs() will create a new semaphore, effectively resetting concurrency limits and allowing more than the configured num_concurrent_runs.

Actual: concurrency limit can be bypassed intermittently/non-deterministically.
Expected: concurrency limit should be enforced consistently for the process lifetime (or at least until runtime.destroy()).

Impact: can exceed intended concurrency caps, causing resource exhaustion and incorrect load-shedding behavior.

Recommendation: Use a normal dict (strong refs) for _max_concurrent_runs, and clear it in destroy(); or otherwise keep strong references for semaphore lifetime management.

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Copy link

@devin-ai-integration devin-ai-integration bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Devin Review found 1 new potential issue.

View issue and 31 additional flags in Devin Review.

Open in Devin Review

Comment on lines 1911 to 1946
async def cancel_handlers_and_tasks(self, *, graceful: bool = True) -> None:
"""Cancel the handler and release it from the store.

Args:
graceful: If True, request graceful cancellation and wait for
WorkflowCancelledEvent. If False, force immediate cancellation
(used for idle release where we don't want to emit cancel event).
"""
if not self.run_handler.is_done():
if graceful:
try:
# Request graceful cancellation - this will emit WorkflowCancelledEvent
await self.run_handler.cancel_run()
except Exception:
pass
try:
# Wait for the workflow to complete after cancellation
# This gives time for WorkflowCancelledEvent to be emitted
await asyncio.wait_for(self.run_handler, timeout=2.0)
except asyncio.TimeoutError:
# Force cancel if graceful cancellation didn't complete in time
self.run_handler.cancel()
except asyncio.CancelledError:
pass
except Exception:
pass
else:
# Force immediate cancellation without waiting
try:
await self.run_handler.cancel_run()
except Exception:
pass
try:
self.run_handler.cancel()
except Exception:
pass

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🟡 Idle release uses graceful cancellation even when graceful=False, contradicting intended semantics

_WorkflowHandler.cancel_handlers_and_tasks(graceful=False) is documented/used for idle release “where we don't want to emit cancel event”, but it still calls await self.run_handler.cancel_run() before hard-cancelling.

Actual: idle release sends a TickCancelRun into the workflow (cancel_run()), which can cause the workflow to emit WorkflowCancelledEvent and transition persisted status to cancelled.
Expected: idle release should stop in-memory execution without changing logical workflow outcome (it should remain resumable/running), or at least avoid emitting cancellation signals.

Code:

  • In non-graceful branch:
await self.run_handler.cancel_run()
...
self.run_handler.cancel()

workflows/server/server.py:1937-1945

Impact: idle release may incorrectly cancel runs instead of just unloading them, breaking resumability and causing incorrect persisted status.

Recommendation: For graceful=False, avoid calling cancel_run(); instead only stop the streaming task and close adapter/resources (or implement a runtime-specific ‘detach/unload’ that doesn’t enqueue TickCancelRun).

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

@adrianlyjak adrianlyjak force-pushed the adrian/dbos branch 4 times, most recently from 2b2fe68 to e31a92d Compare January 29, 2026 20:13
@adrianlyjak adrianlyjak changed the base branch from main to adrian/runtime-split January 29, 2026 20:14
@adrianlyjak adrianlyjak changed the title Runtime refactor to support pluggable runtimes DBOS runtime Jan 29, 2026
@adrianlyjak adrianlyjak marked this pull request as draft January 29, 2026 20:14
@adrianlyjak adrianlyjak force-pushed the adrian/dbos branch 2 times, most recently from bc769c2 to 1a9de11 Compare January 31, 2026 18:10
@adrianlyjak adrianlyjak force-pushed the adrian/dbos branch 3 times, most recently from 69da0b3 to 9a296b1 Compare February 1, 2026 03:28
@adrianlyjak adrianlyjak changed the base branch from adrian/ticky to adrian/store-interface February 1, 2026 03:38
@adrianlyjak adrianlyjak force-pushed the adrian/dbos branch 2 times, most recently from 53dab5b to ee2c407 Compare February 2, 2026 05:37
@adrianlyjak adrianlyjak force-pushed the adrian/store-interface branch 2 times, most recently from 6631af8 to 67c0fdc Compare February 2, 2026 20:22
@adrianlyjak adrianlyjak force-pushed the adrian/dbos branch 3 times, most recently from 250d793 to 35194c5 Compare February 2, 2026 22:00
@review-notebook-app
Copy link

Check out this pull request on  ReviewNB

See visual diffs & provide feedback on Jupyter Notebooks.


Powered by ReviewNB

@adrianlyjak adrianlyjak changed the base branch from adrian/store-interface to adrian/ticky February 2, 2026 22:56
Copy link

@devin-ai-integration devin-ai-integration bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Devin Review found 1 new potential issue.

View issue and 36 additional flags in Devin Review.

Open in Devin Review

Comment on lines +519 to +525
async with self._lock:
state, commit_fn = await self._run_sync(_edit_with_lock)
try:
yield state
await self._run_sync(commit_fn, state)
except Exception:
raise

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔴 Database connection leak when exception raised in edit_state context

When an exception is raised inside the edit_state context manager, the database connection is never closed, causing a resource leak.

Click to expand

How the bug is triggered

  1. _edit_with_lock() opens a connection and begins a transaction (lines 492-493)
  2. It returns state, commit_fn where commit_fn contains the cleanup logic (trans.commit() and conn.close())
  3. In the async context manager (lines 519-525):
    • state, commit_fn = await self._run_sync(_edit_with_lock) - connection is opened
    • yield state - user code runs
    • If user code raises an exception, execution jumps to line 524-525 which just re-raises
    • commit_fn is never called, so the connection is never closed

Actual vs Expected

Actual: When user code raises an exception:

async with store.edit_state() as state:
    state["value"] = "modified"
    raise ValueError("intentional error")  # Connection leaks!

The connection remains open and the transaction is left hanging.

Expected: The connection should be rolled back and closed when an exception occurs.

Impact

Database connection pool exhaustion over time, especially in long-running applications with error conditions. This could cause the application to eventually fail to connect to the database.

Recommendation: Add rollback and connection cleanup in the exception handler:

async with self._lock:
    state, commit_fn = await self._run_sync(_edit_with_lock)
    try:
        yield state
        await self._run_sync(commit_fn, state)
    except Exception:
        # Need to rollback and close the connection
        def _rollback_and_close() -> None:
            try:
                trans.rollback()
            finally:
                conn.close()
        await self._run_sync(_rollback_and_close)
        raise

Alternatively, restructure to use a separate cleanup function that's always called, or capture conn and trans in a way that allows cleanup in the except block.

Open in Devin Review

Was this helpful? React with 👍 or 👎 to provide feedback.

Base automatically changed from adrian/ticky to main February 3, 2026 17:50
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants